package cn.cup.dmp.report.MediaAnalysis

import java.util.Properties

import cn.cup.dmp.config.ConfigHelper
import cn.cup.dmp.utils.RptKpiHandler
import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object MediaAnalysis {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("媒体报表分析").setMaster("local[*]")
    conf.set("spark.serializer", ConfigHelper.ser)
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)
    val file: DataFrame = sQLContext.read.parquet(ConfigHelper.destPath)

    /* 广播变量 */
    val directory = sc.textFile("D:\\小牛视频\\资料_广告项目\\app_dict.txt")
    val direc: RDD[(String, String)] = directory.map(line => {
      line.split("\t", -1)
    }).filter(_.length >= 5).map(arr => (arr(4), arr(1)))
    val directoryData = direc.collectAsMap()/*用Map格式查找快*/
    val broadCast = sc.broadcast(directoryData)

    /**
      * 数据切分和过滤
      */
    val appData = file.map(row => {
      var appName = row.getAs[String]("appname")
      val appId = row.getAs[String]("appid")
      /*如果APPName为空，则通过appId去字典中找，如果两者都为空，则赋值为未知*/
      if (StringUtils.isEmpty(appName)) {
        if (StringUtils.isEmpty(appId)) {
          appName = "未知"
        } else {
          appName = broadCast.value.getOrElse(appId, "未知")
        }
      }
      val list = RptKpiHandler.rptKpi(row)
      (appName, list)
    })
    //根据AppName进行聚合
    val res = appData.reduceByKey((lst1,lst2)=>{
      val zipList = lst1.zip(lst2)
      zipList.map(t => t._1 + t._2)
    })

    //转化为DataFrame，把结果存入数据库
    import sQLContext.implicits._
    val frame = res.map(t => (t._1, t._2(0).toInt, t._2(1).toInt, t._2(2).toInt, t._2(3).toInt, t._2(4).toInt, t._2(5), t._2(6), t._2(7).toInt, t._2(8).toInt))
      .toDF("appName", "original_request_number", "valid_requests", "meet_condition_request",
        "partitionbidsNum", "success_bid_Num", "everytime_consump", "everytime_cost", "show_times", "click_times")

    /**
      * 写入数据库
      */

    val connetProperties = new Properties()
    connetProperties.setProperty("user", ConfigHelper.username)
    connetProperties.setProperty("password", ConfigHelper.password)
    frame.write.jdbc(ConfigHelper.url, "t_app_count_analysis", connetProperties)

    sc.stop()
  }

}
